/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.phoenix.compile;

import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.execute.AggregatePlan;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
import org.apache.phoenix.parse.PFunction;
import org.apache.phoenix.parse.PSchema;
import org.apache.phoenix.parse.SelectStatement;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.AmbiguousColumnException;
import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
import org.apache.phoenix.schema.ColumnNotFoundException;
import org.apache.phoenix.schema.ColumnRef;
import org.apache.phoenix.schema.FunctionNotFoundException;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnFamily;
import org.apache.phoenix.schema.SchemaNotFoundException;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.TransactionUtil;

import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;

/**
 * Class that compiles plan to update data values after a DDL command executes. TODO: get rid of
 * this ugly code and just go through the standard APIs. The only time we may still need this is to
 * manage updating the empty key value, as we sometimes need to "go back through time" to adjust
 * this.
 * @since 0.1
 */
public class PostDDLCompiler {
  private final PhoenixConnection connection;
  private final Scan scan;

  public PostDDLCompiler(PhoenixConnection connection) {
    this(connection, new Scan());
  }

  public PostDDLCompiler(PhoenixConnection connection, Scan scan) {
    this.connection = connection;
    this.scan = scan;
    scan.setAttribute(BaseScannerRegionObserverConstants.UNGROUPED_AGG, QueryConstants.TRUE);
  }

  public MutationPlan compile(final List<TableRef> tableRefs, final byte[] emptyCF,
    final List<byte[]> projectCFs, final List<PColumn> deleteList, final long timestamp)
    throws SQLException {
    PhoenixStatement statement = new PhoenixStatement(connection);
    final StatementContext context = new StatementContext(statement,
      new MultipleTableRefColumnResolver(tableRefs), scan, new SequenceManager(statement));
    return new PostDDLMutationPlan(context, tableRefs, timestamp, emptyCF, deleteList, projectCFs);
  }

  private static class MultipleTableRefColumnResolver implements ColumnResolver {

    private final List<TableRef> tableRefs;

    public MultipleTableRefColumnResolver(List<TableRef> tableRefs) {
      this.tableRefs = tableRefs;
    }

    @Override
    public List<TableRef> getTables() {
      return tableRefs;
    }

    @Override
    public TableRef resolveTable(String schemaName, String tableName) throws SQLException {
      throw new UnsupportedOperationException();
    }

    @Override
    public ColumnRef resolveColumn(String schemaName, String tableName, String colName)
      throws SQLException {
      throw new UnsupportedOperationException();
    }

    @Override
    public List<PFunction> getFunctions() {
      return Collections.<PFunction> emptyList();
    }

    @Override
    public PFunction resolveFunction(String functionName) throws SQLException {
      throw new FunctionNotFoundException(functionName);
    }

    @Override
    public boolean hasUDFs() {
      return false;
    }

    @Override
    public PSchema resolveSchema(String schemaName) throws SQLException {
      throw new SchemaNotFoundException(schemaName);
    }

    @Override
    public List<PSchema> getSchemas() {
      throw new UnsupportedOperationException();
    }

  }

  private class PostDDLMutationPlan extends BaseMutationPlan {

    private final StatementContext context;
    private final List<TableRef> tableRefs;
    private final long timestamp;
    private final byte[] emptyCF;
    private final List<PColumn> deleteList;
    private final List<byte[]> projectCFs;

    public PostDDLMutationPlan(StatementContext context, List<TableRef> tableRefs, long timestamp,
      byte[] emptyCF, List<PColumn> deleteList, List<byte[]> projectCFs) {
      super(context, Operation.UPSERT);
      this.context = context;
      this.tableRefs = tableRefs;
      this.timestamp = timestamp;
      this.emptyCF = emptyCF;
      this.deleteList = deleteList;
      this.projectCFs = projectCFs;
    }

    @Override
    public MutationState execute() throws SQLException {
      if (tableRefs.isEmpty()) {
        return new MutationState(0, 1000, connection);
      }
      boolean wasAutoCommit = connection.getAutoCommit();
      try {
        connection.setAutoCommit(true);
        SQLException sqlE = null;
        /*
         * Handles: 1) deletion of all rows for a DROP TABLE and subsequently deletion of all rows
         * for a DROP INDEX; 2) deletion of all column values for a ALTER TABLE DROP COLUMN 3)
         * updating the necessary rows to have an empty KV 4) updating table stats
         */
        long totalMutationCount = 0;
        for (final TableRef tableRef : tableRefs) {
          Scan scan = ScanUtil.newScan(context.getScan());
          SelectStatement select = SelectStatement.COUNT_ONE;
          // We need to use this tableRef
          ColumnResolver resolver = new SingleTableRefColumnResolver(tableRef);
          PhoenixStatement statement = new PhoenixStatement(connection);
          StatementContext context =
            new StatementContext(statement, resolver, scan, new SequenceManager(statement));
          long ts = timestamp;
          // FIXME: DDL operations aren't transactional, so we're basing the timestamp on a server
          // timestamp.
          // Not sure what the fix should be. We don't need conflict detection nor filtering of
          // invalid transactions
          // in this case, so maybe this is ok.
          if (ts != HConstants.LATEST_TIMESTAMP && tableRef.getTable().isTransactional()) {
            ts = TransactionUtil.convertToNanoseconds(ts);
          }
          ScanUtil.setTimeRange(scan, scan.getTimeRange().getMin(), ts);
          if (emptyCF != null) {
            scan.setAttribute(BaseScannerRegionObserverConstants.EMPTY_CF, emptyCF);
            scan.setAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER,
              EncodedColumnsUtil.getEmptyKeyValueInfo(tableRef.getTable()).getFirst());
          }
          ServerCache cache = null;
          try {
            if (deleteList != null) {
              if (deleteList.isEmpty()) {
                scan.setAttribute(BaseScannerRegionObserverConstants.DELETE_AGG,
                  QueryConstants.TRUE);
                // In the case of a row deletion, add index metadata so mutable secondary indexing
                // works
                /*
                 * TODO: we currently manually run a scan to delete the index data here
                 * ImmutableBytesWritable ptr = context.getTempPtr();
                 * tableRef.getTable().getIndexMaintainers(ptr); if (ptr.getLength() > 0) {
                 * IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection,
                 * tableRef); cache = client.addIndexMetadataCache(context.getScanRanges(), ptr);
                 * byte[] uuidValue = cache.getId(); scan.setAttribute(PhoenixIndexCodec.INDEX_UUID,
                 * uuidValue); }
                 */
              } else {
                // In the case of the empty key value column family changing, do not send the index
                // metadata, as we're currently managing this from the client. It's possible for the
                // data empty column family to stay the same, while the index empty column family
                // changes.
                PColumn column = deleteList.get(0);
                byte[] cq = column.getColumnQualifierBytes();
                if (emptyCF == null) {
                  scan.addColumn(column.getFamilyName().getBytes(), cq);
                }
                scan.setAttribute(BaseScannerRegionObserverConstants.DELETE_CF,
                  column.getFamilyName().getBytes());
                scan.setAttribute(BaseScannerRegionObserverConstants.DELETE_CQ, cq);
              }
            }
            List<byte[]> columnFamilies =
              Lists.newArrayListWithExpectedSize(tableRef.getTable().getColumnFamilies().size());
            if (projectCFs == null) {
              for (PColumnFamily family : tableRef.getTable().getColumnFamilies()) {
                columnFamilies.add(family.getName().getBytes());
              }
            } else {
              for (byte[] projectCF : projectCFs) {
                columnFamilies.add(projectCF);
              }
            }
            // Need to project all column families into the scan, since we haven't yet created our
            // empty key value
            RowProjector projector = ProjectionCompiler.compile(context, SelectStatement.COUNT_ONE,
              GroupBy.EMPTY_GROUP_BY);
            context.getAggregationManager().compile(context, GroupBy.EMPTY_GROUP_BY);
            // Explicitly project these column families and don't project the empty key value,
            // since at this point we haven't added the empty key value everywhere.
            if (columnFamilies != null) {
              scan.getFamilyMap().clear();
              for (byte[] family : columnFamilies) {
                scan.addFamily(family);
              }
              projector = new RowProjector(projector, false);
            }
            // Ignore exceptions due to not being able to resolve any view columns,
            // as this just means the view is invalid. Continue on and try to perform
            // any other Post DDL operations.
            try {
              // Since dropping a VIEW does not affect the underlying data, we do
              // not need to pass through the view statement here.
              WhereCompiler.compile(context, select); // Push where clause into scan
            } catch (ColumnFamilyNotFoundException e) {
              continue;
            } catch (ColumnNotFoundException e) {
              continue;
            } catch (AmbiguousColumnException e) {
              continue;
            }
            QueryPlan plan = new AggregatePlan(context, select, tableRef, projector, null, null,
              OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null, null);
            try {
              ResultIterator iterator = plan.iterator();
              try {
                Tuple row = iterator.next();
                ImmutableBytesWritable ptr = context.getTempPtr();
                totalMutationCount +=
                  (Long) projector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr);
              } catch (SQLException e) {
                sqlE = e;
              } finally {
                try {
                  iterator.close();
                } catch (SQLException e) {
                  if (sqlE == null) {
                    sqlE = e;
                  } else {
                    sqlE.setNextException(e);
                  }
                } finally {
                  if (sqlE != null) {
                    throw sqlE;
                  }
                }
              }
            } catch (TableNotFoundException e) {
              // Ignore and continue, as HBase throws when table hasn't been written to
              // FIXME: Remove if this is fixed in 0.96
            }
          } finally {
            if (cache != null) { // Remove server cache if there is one
              cache.close();
            }
          }

        }
        final long count = totalMutationCount;
        return new MutationState(1, 1000, connection) {
          @Override
          public long getUpdateCount() {
            return count;
          }
        };
      } finally {
        if (!wasAutoCommit) connection.setAutoCommit(wasAutoCommit);
      }
    }

    private class SingleTableRefColumnResolver implements ColumnResolver {
      private final TableRef tableRef;

      public SingleTableRefColumnResolver(TableRef tableRef) {
        this.tableRef = tableRef;
      }

      @Override
      public List<TableRef> getTables() {
        return Collections.singletonList(tableRef);
      }

      @Override
      public List<PFunction> getFunctions() {
        return Collections.emptyList();
      }

      ;

      @Override
      public TableRef resolveTable(String schemaName, String tableName) throws SQLException {
        throw new UnsupportedOperationException();
      }

      @Override
      public ColumnRef resolveColumn(String schemaName, String tableName, String colName)
        throws SQLException {
        PColumn column = tableName != null
          ? tableRef.getTable().getColumnFamily(tableName).getPColumnForColumnName(colName)
          : tableRef.getTable().getColumnForColumnName(colName);
        return new ColumnRef(tableRef, column.getPosition());
      }

      @Override
      public PFunction resolveFunction(String functionName) throws SQLException {
        throw new UnsupportedOperationException();
      }

      @Override
      public boolean hasUDFs() {
        return false;
      }

      @Override
      public List<PSchema> getSchemas() {
        throw new UnsupportedOperationException();
      }

      @Override
      public PSchema resolveSchema(String schemaName) throws SQLException {
        throw new SchemaNotFoundException(schemaName);
      }
    }
  }
}
